package com.wang.returns;

import com.rabbitmq.client.*;

import com.wang.utils.RabbitMqUtil;

import java.io.IOException;


/**
 * @BelongsProject: RabbitMqLearn
 * @BelongsPackage: com.wang
 * @Author: wang fei
 * @CreateTime: 2023-02-03  15:58
 * @Description: TODO rabbitmq-topics模式 生产者  通过confirm机制保证⽣产者消息能够投递到MQ  通过return机制保证消息在rabbitmq中能够 成功的投递到队列⾥
 * @Version: 1.0
 */
public class MyProducer {
    //定义交换机名称
    public  static String EXCHANGE_NAME = "my_topic_exchange";

    public static void main(String[] args) throws Exception {
        Connection connection=RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        //声名交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //开启confirm机制
        channel.confirmSelect();
        //设置confrim的监听器
        channel.addConfirmListener(new ConfirmListener() {
            //当消息发送成功将会执⾏这⾥的⽅法
            @Override
            public void handleAck(long l, boolean b) throws IOException {
                System.out.println("消息已经成功投递");
            }
            //当消息发送失败会执⾏这⾥的⽅法，通过重试机制，进⾏重新投递，如果重新投递的次数达到阈值，那么就需要⼈⼯介⼊
            @Override
            public void handleNack(long l, boolean b) throws IOException {
                System.out.println("消息投递失败");
            }
        });

        // 开启return机制
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String
                    replyText, String exchange, String routingKey,
                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
                //如果消息没有成功抵达队列，此⽅法将会被调⽤
                System.out.println("消息没有抵达队列");
            }
        });

        //发送信息
        String message = "topics模式!";
        channel.basicPublish(EXCHANGE_NAME, "product.add.one", true,null, message.getBytes());
        System.out.println("消息发送成功");
        //断开链接
        channel.close();
        connection.close();
    }

}
